package com.eshipenet.shipnet.equipmentextbuilder

import java.util.Properties

import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import play.api.libs.json.{JsObject, JsValue, Json}

import scala.util.Try

object EquipmentLinkActionJsonStreamProvider {

  def kafkaSource(env: StreamExecutionEnvironment): DataStream[JsValue] = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka-server:9092")
    properties.setProperty("group.id", "equipmentextbuilder.EquipmentLink")
    val der = new JsonKafkaDeserializer()
    val consumer = new FlinkKafkaConsumer[RawKafkaMessage](
      "shipnettestv3.shipnettestv3.equipment_equipmentmodel_link",
      der,
      properties
    )
    consumer.setStartFromEarliest()
    val streamTry = env
      .addSource(consumer)
      .map(msg => Try(Json.parse(msg.value)))
    val stream = streamTry
      .filter(_.isSuccess)
      .map(_.get)
    stream
  }

  def listSource(env: StreamExecutionEnvironment): DataStream[JsValue] = env.fromCollection(List())

}
